并行流的使用偶现ConcurrentModificationException 异常

问题描述

今天排查线上偶现的问题,查日志是并发修改集合的异常:
image-20210114144351757

问题代码:

1
2
3
4
5
Map<String, Object> context = ContextUtil.getAll();
req.getLines().parallelStream().forEach(lineNo -> {
ContextUtil.putAll(context);
baseWave.occupyLine(warehouseCode, lineNo, req.getPlanDate());
});

其中ContextUtil 是 由ThreadLocal实现的 保存上下文的工具类

简化了大部分代码,大概这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ContextUtil {
public static final ThreadLocal<Map<String, Object>> CONTEXT = ThreadLocal.withInitial(() -> {
Map<String, Object> map = new HashMap(10);
return map;
});
private ContextUtil() {
}
public static void put(String key, Object value) {
if (value != null) {
((Map)CONTEXT.get()).put(key, value);
}
}
public static void putAll(Map<String, Object> map) {
if (map != null) {
map.forEach(ContextUtil::put);
}
}
public static Map<String, Object> getAll() {
return (Map)CONTEXT.get();
}


}

解决过程

1.看到这个问题,首先想到的是并行流,多线程去修改了context (HashMap)这个集合

2.看代码发现ContextUtil 这里的实现是用的 ThreadLocal 这里不应该出现并发问题

3.跟踪代码发现在发送http请求的时候也会去设置 Context

image-20210103195300233

看到这里,当时的出来的结论还是,不应该啊,我是ThreadLocal,why???

4.于是在本地写main方法,模拟上面的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws InterruptedException {
List<String> list = Lists.newArrayList();
for (int i = 0; i < 1000; i++) {
list.add(RandomStringUtils.random(5));
ContextUtil.put("test" + i, "tes1");

}
Map<String, Object> context = ContextUtil.getAll();
for (int i = 0; i < 100; i++) {
list.parallelStream().forEach(lineNo -> {
ContextUtil.putAll(context);
ContextUtil.put("newTest", "tes11");
});
}

在模拟的过程中发现:

1.ContextUtil 中放入的值越多,出现异常的概率越大

2.如果不设置新的值不会出现异常( ContextUtil.put("newTest", "tes11"); 如果没有这段代码)

3.如果使用线程池,不使用并行流,不会出现异常,比如下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
List<String> list =  Lists.newArrayList();
for (int i = 0; i < 1000; i++) {
list.add(RandomStringUtils.random(5));
ContextUtil.put("test" + i, "tes1");
}
Map<String, Object> context = ContextUtil.getAll();
ExecutorService executorService= Executors.newFixedThreadPool(4);
for (int i = 0; i < 100; i++) {
list.forEach(x->{
executorService.execute(()->{
ContextUtil.putAll(context);
ContextUtil.put("newTest", "tes11");
System.out.println(x);
});
});
}

executorService.shutdown();
executorService.awaitTermination(20,TimeUnit.SECONDS);

分析

并行流与线程池方式的差别:

并行流采用的是 fork/join 方式实现

image-20210103221247102

与线程池的区别就是,fork /join 主线程会参与任务的执行,而线程池中,主线程只管提交任务,而任务的执行是由线程池中的线程。

再看代码。ContextUtil.getAll() 方法没有使用深拷贝。

image-20210103221953182

问题就出现在这里,在并行流中,可能其他fork出来的子线程在 foreach 遍历主线程的上下文map,而主线程可能正在修改这个map

所以报错。

这样就解释了 ContextUtil 中放入的值越多,出现异常的概率越大,因为值越多,遍历的时间越长,冲突的机率越大。

同样也解释了如果使用线程池,不使用并行流,不会出现异常,因为主线程不参与任务的执行,修改的map 都是各自线程中threadLocal中的Map,不会有冲突。

但是无法解释 如果不设置新的值不会出现异常 ContextUtil.put("newTest", "tes11");;

看hashMap 的代码

image-20210103223430167

在遍历方法的前后 会判断 modCount值是否相等,否则跑出异常

而put 方法,只会在新加值的时候才会修改这个值

image-20210103224238855

所以一切都得到了解释。

解决方案

  1. 使用深拷贝,这样遍历的map 和修改的map不是同一个,不会出现问题
1
2
3
public static Map<String, Object> getAll() {
return new HashMap<>((Map)CONTEXT.get());
}
  1. 使用线程池,而不是并行流

    1
    2
    3
    4
    5
    6
    CompletableFuture<?>[] futures = req.getLines().stream().map(lineNo ->
    CompletableFuture.runAsync(() -> {
    baseWave.occupyLine(warehouseCode, lineNo, req.getPlanDate());
    },insideTaskExecutor)
    ).toArray(CompletableFuture[]::new);
    CompletableFuture.allOf(futures).join();

这里建议采用方案2,应为这里的代码需求应该复制主线程上下文让子线程使用,防止traceId,cityId 等信息断裂,而使用并行流,主线程参与任务,这样可能会修改上下文信息,后面可能有更大更难发现的坑。
注意
这里最后的方案采用的是改造过的线程池,用于自动传递上下文,改造方法参考阿里的TransmittableThreadLocal,执行方法前,复制父线程的上下文信息,执行之后恢复现场